package com.example;

import org.apache.pulsar.client.api.*;

import java.util.concurrent.CompletableFuture;

/**
 * @author: linfeng
 * @date: 2021/11/12 20:21
 */
public class SimpleConsumer {

    private static PulsarClient client;

    private static final String TOPIC_NAME = "TEST-TOPIC";

    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

//        consumeMessageWithListener();
//         consumeMessageWithLoop();
        consumeMessageAsync();
    }

    private static void consumeMessageWithListener() throws PulsarClientException {
        ConsumerBuilder<String> stringConsumerBuilder = client.newConsumer(Schema.STRING);
        Consumer<String> stringConsumer = stringConsumerBuilder
                .topic(TOPIC_NAME)
                .subscriptionName("消费者-张三")
                .messageListener((MessageListener<String>) (consumer, msg) -> {
                    String message = new String(msg.getData());
                    System.out.println("收到消息：" + message);
                })
                .subscribe();
    }

    private static void consumeMessageWithLoop() throws PulsarClientException {
        ConsumerBuilder<String> stringConsumerBuilder = client.newConsumer(Schema.STRING);
        Consumer<String> stringConsumer = stringConsumerBuilder
                .topic(TOPIC_NAME)
                .subscriptionName("消费者-张三")
                .subscribe();

        while (true){
            Message<String> message = stringConsumer.receive();

            try{
                System.out.println(message.getValue());
                // 确认消息，从而让该消息可以被broker删除
                stringConsumer.acknowledge(message.getMessageId());
            }catch (Exception e){
                // 消费消息失败，让消息可以重新投递
                stringConsumer.negativeAcknowledge(message.getMessageId());
            }


        }
    }

    private static void consumeMessageAsync() throws PulsarClientException {
        ConsumerBuilder<String> stringConsumerBuilder = client.newConsumer(Schema.STRING);
        Consumer<String> stringConsumer = stringConsumerBuilder
                .topic(TOPIC_NAME)
                .subscriptionName("消费者-张三")
                .subscribe();
        CompletableFuture<Message<String>> messageCompletableFuture = stringConsumer.receiveAsync();
        messageCompletableFuture.whenComplete((stringMessage, throwable) -> {
           if(throwable != null){
               stringConsumer.negativeAcknowledge(stringMessage);
           }else {
               System.out.println(stringMessage.getValue());
               try {
                   stringConsumer.acknowledge(stringMessage.getMessageId());

               } catch (PulsarClientException e) {
                   e.printStackTrace();
               }
           }
        });
    }
}
